In [1]:
import sys

if not '..' in sys.path:
    sys.path.append('..')
    
from draw_workflow import draw_workflow

Noodles

Easy concurrent programming in using Python

Johan Hidding, Thursday 19-11-2015 @ NLeSC


In [2]:
from noodles import schedule, run, run_parallel, gather

But, why?

  • save time user's time
  • be flexible

Alternatives

  • What we discussed: Taverna, KNIME, Pegasus etc.
  • Celery
  • IPyParallel
  • Fireworks
  • Hadoop / Spark

Noodles parable (thank you Oscar!)

start with example

We start with a few functions that happen to exist some out where


In [3]:
@schedule
def add(a, b):
    return a+b

@schedule
def sub(a, b):
    return a-b

@schedule
def mul(a, b):
    return a*b

Our fledgeling Python script kiddie then enters the following code


In [4]:
u = add(5, 4)
v = sub(u, 3)
w = sub(u, 2)
x = mul(v, w)

In [5]:
draw_workflow('callgraph1.png', x._workflow)

resulting in this workflow:

We may run this in parallel!


In [6]:
run_parallel(x, n_threads = 2)


Out[6]:
42

How does it work?

  • Decorate functions to build a workflow
  • Use any back-end to run on

The decorator


In [7]:
def schedule(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        bound_args = signature(f).bind(*args, **kwargs)
        bound_args.apply_defaults()
        return PromisedObject(merge_workflow(f, bound_args))

    return wrapped

Mocking a 'real' Python object


In [8]:
class PromisedObject:
    def __init__(self, workflow):
        self._workflow = workflow

    def __call__(self, *args, **kwargs):
        return _do_call(self._workflow, *args, **kwargs)

    def __getattr__(self, attr):
        if attr[0] == '_':
            return self.__dict__[attr]

        return _getattr(self._workflow, attr)

    def __setattr__(self, attr, value):
        if attr[0] == '_':
            self.__dict__[attr] = value
            return

        self._workflow = get_workflow(_setattr(self._workflow, attr, value))

Merging workflows into a function call


In [9]:
def merge_workflow(f, bound_args):
    variadic = next((x.name for x in bound_args.signature.parameters.values()
        if x.kind == Parameter.VAR_POSITIONAL), None)

    if variadic:
        bound_args.arguments[variadic] = list(bound_args.arguments[variadic])

    node = FunctionNode(f, bound_args)

    idx = id(node)
    nodes = {idx: node}
    links = {idx: set()}

    for address in serialize_arguments(bound_args):
        workflow = get_workflow(
            ref_argument(bound_args, address))

        if not workflow:
            continue

        set_argument(bound_args, address, Parameter.empty)
        for n in workflow.nodes:
            if n not in nodes:
                nodes[n] = workflow.nodes[n]
                links[n] = set()

            links[n].update(workflow.links[n])

        links[workflow.top].add((idx, address))

    return Workflow(id(node), nodes, links)

eeeehm, What can we do (sort of)?

  • embarrassingly parallel loops
  • embedded workflows
  • empirical member assignment

loops


In [10]:
from noodles import schedule, run, run_parallel, gather

In [11]:
@schedule
def sum(a, buildin_sum = sum):
    return buildin_sum(a)

In [12]:
r1 = add(1, 1)
r2 = sub(3, r1)

def foo(a, b, c):
    return mul(add(a, b), c)

multiples = [foo(i, r2, r1) for i in range(6)]

r5 = sum(gather(*multiples))

In [13]:
draw_workflow('callgraph2.png', r5._workflow)


In [14]:
run_parallel(r5, n_threads = 4)


Out[14]:
42

embedded workflows


In [15]:
@schedule
def sqr(a):
    return a*a

@schedule
def map(f, lst):
    return gather(*[f(x) for x in lst])

@schedule
def num_range(a, b):
    return range(a, b)

In [16]:
wf = sum(map(sqr, num_range(0, 1000)))

In [17]:
draw_workflow('callgraph3.png', wf._workflow)


In [18]:
run_parallel(wf, n_threads=4)


Out[18]:
332833500

Using objects

Golden rule

  • if you change something, return it

In [19]:
@schedule
class A:
    def __init__(self, value):
        self.value = value

    def multiply(self, factor):
        self.value *= factor

In [20]:
a = A(5)
a.multiply(10)
a.second = 7

In [21]:
draw_workflow("callgraph4.png", a._workflow)


In [22]:
@schedule
class A:
    def __init__(self, value):
        self.value = value

    def multiply(self, factor):
        self.value *= factor
        return self

In [23]:
a = A(5)
a = a.multiply(10)
a.second = 7

In [24]:
draw_workflow("callgraph5.png", a._workflow)


In [25]:
result = run_parallel(a, n_threads=4)
print(result.value, result.second)


50 7

Questions / Suggestions